package com.hbase.dao;

import java.io.IOException;
import java.util.Collection;
import java.util.List;
import java.util.Map;

import org.apache.commons.lang.StringUtils;
import org.apache.hadoop.hbase.ClusterStatus;
import org.apache.hadoop.hbase.HColumnDescriptor;
import org.apache.hadoop.hbase.HServerLoad;
import org.apache.hadoop.hbase.HTableDescriptor;
import org.apache.hadoop.hbase.MasterNotRunningException;
import org.apache.hadoop.hbase.ServerName;
import org.apache.hadoop.hbase.HServerLoad.RegionLoad;
import org.apache.hadoop.hbase.ZooKeeperConnectionException;
import org.apache.hadoop.hbase.client.HBaseAdmin;
import org.apache.hadoop.hbase.client.HTableInterface;
import org.apache.hadoop.hbase.util.Bytes;
import org.apache.log4j.Logger;

import com.google.common.collect.Lists;
import com.google.common.collect.Maps;
import com.hbase.dao.entity.clusters.HBaseClustersStatusEntity;
import com.hbase.dao.entity.clusters.HBaseHServerLoadEntity;
import com.hbase.dao.entity.clusters.HBaseRegionLoadEntity;
import com.hbase.dao.entity.htable.HBaseTableDescribesEntity;
import com.hbase.dao.entity.htable.HBaseTableFamilyDescribesEntity;
import com.hbase.dao.exception.HBaseDaoException;
import com.hbase.dao.pool.HTablePoolEngine;
import com.hbase.dao.util.CheckUtil;

/**
 * HBase工具类
 * 
 * @author yangshenhui
 * @version 1.0
 */
public class HBaseHelper {
	private static Logger logger = Logger.getLogger(HBaseHelper.class);

	/**
	 * 创建表
	 * 
	 * @param hBaseTableDescribesEntity
	 *            HBase表信息描述
	 * @return boolean
	 * @throws HBaseDaoException
	 */
	public static boolean create(
			HBaseTableDescribesEntity hBaseTableDescribesEntity,
			String hBaseInstanceName) throws HBaseDaoException {
		CheckUtil.checkEmptyString(hBaseInstanceName);
		CheckUtil.checkEmptyString(hBaseTableDescribesEntity.getTableName());
		CheckUtil.checkEmptyMap(hBaseTableDescribesEntity.getFamilyMap());
		logger.info(String.format("开始创建HBase表[%s].",
				hBaseTableDescribesEntity.getTableName()));
		HBaseAdmin hBaseAdmin = null;
		try {
			hBaseAdmin = new HBaseAdmin(
					HTablePoolEngine.getHBaseConfiguration(hBaseInstanceName));
			if (hBaseAdmin
					.tableExists(hBaseTableDescribesEntity.getTableName())) {
				logger.warn(String.format("HBase表[%s]已经存在不能创建.",
						hBaseTableDescribesEntity.getTableName()));
				return false;
			}
			if (CheckUtil.isNotEmpty(hBaseTableDescribesEntity
					.getSplitRowKeys())) {
				logger.info(String.format("通过指定主键数组%s创建HBase表以及预分区.",
						hBaseTableDescribesEntity.getSplitRowKeyList()));
				hBaseAdmin
						.createTable(
								create(hBaseTableDescribesEntity.getTableName(),
										hBaseTableDescribesEntity
												.getFamilyMap(),
										hBaseTableDescribesEntity
												.getMaxFileSize(),
										hBaseTableDescribesEntity
												.getMemstoreFlushSize(),
										hBaseTableDescribesEntity.isReadOnly(),
										hBaseTableDescribesEntity
												.isDeferredLogFlush()),
								hBaseTableDescribesEntity.getSplitRowKeys());
			} else if (StringUtils.isNotEmpty(hBaseTableDescribesEntity
					.getStartRowKey())
					&& StringUtils.isNotEmpty(hBaseTableDescribesEntity
							.getEndRowKey())
					&& hBaseTableDescribesEntity.getRegionNumber() != 0) {
				logger.info(String
						.format("通过指定开始的主键[%s]结束的主键[%s]以及预建region的数量[%s]创建HBase表以及预分区.",
								hBaseTableDescribesEntity.getStartRowKey(),
								hBaseTableDescribesEntity.getEndRowKey(),
								hBaseTableDescribesEntity.getRegionNumber()));
				hBaseAdmin
						.createTable(
								create(hBaseTableDescribesEntity.getTableName(),
										hBaseTableDescribesEntity
												.getFamilyMap(),
										hBaseTableDescribesEntity
												.getMaxFileSize(),
										hBaseTableDescribesEntity
												.getMemstoreFlushSize(),
										hBaseTableDescribesEntity.isReadOnly(),
										hBaseTableDescribesEntity
												.isDeferredLogFlush()), Bytes
										.toBytes(hBaseTableDescribesEntity
												.getStartRowKey()), Bytes
										.toBytes(hBaseTableDescribesEntity
												.getEndRowKey()),
								hBaseTableDescribesEntity.getRegionNumber());
			} else {
				logger.info("创建HBase表不预建分区.");
				hBaseAdmin.createTable(create(
						hBaseTableDescribesEntity.getTableName(),
						hBaseTableDescribesEntity.getFamilyMap(),
						hBaseTableDescribesEntity.getMaxFileSize(),
						hBaseTableDescribesEntity.getMemstoreFlushSize(),
						hBaseTableDescribesEntity.isReadOnly(),
						hBaseTableDescribesEntity.isDeferredLogFlush()));
			}
			logger.info(String.format("创建HBase表%s.", hBaseTableDescribesEntity));
			logger.info(String.format("结束创建HBase表[%s].",
					hBaseTableDescribesEntity.getTableName()));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			closeHBaseAdmin(hBaseAdmin);
		}
		return true;
	}

	private static HTableDescriptor create(String tableName,
			Map<String, HBaseTableFamilyDescribesEntity> familyMap,
			long maxFileSize, long memstoreFlushSize, boolean isReadOnly,
			boolean isDeferredLogFlush) {
		HTableDescriptor hTableDescriptor = new HTableDescriptor(tableName);
		if (maxFileSize > 0) {
			hTableDescriptor.setMaxFileSize(maxFileSize);
		}
		if (memstoreFlushSize > 0) {
			hTableDescriptor.setMemStoreFlushSize(memstoreFlushSize);
		}
		if (isReadOnly) {
			hTableDescriptor.setReadOnly(isReadOnly);
		}
		if (isDeferredLogFlush) {
			hTableDescriptor.setDeferredLogFlush(isDeferredLogFlush);
		}
		for (String family : familyMap.keySet()) {
			HColumnDescriptor hColumnDescriptor = new HColumnDescriptor(family);
			HBaseTableFamilyDescribesEntity hBaseTableFamilyDescribesEntity = familyMap
					.get(family);
			if (CheckUtil.isNotNull(hBaseTableFamilyDescribesEntity)) {
				if (hBaseTableFamilyDescribesEntity.getTimeToLive() > 0) {
					hColumnDescriptor
							.setTimeToLive(hBaseTableFamilyDescribesEntity
									.getTimeToLive());
				}
				if (hBaseTableFamilyDescribesEntity.getBloomType() != null) {
					hColumnDescriptor
							.setBloomFilterType(hBaseTableFamilyDescribesEntity
									.getBloomType());
				}
				if (hBaseTableFamilyDescribesEntity.getCompression() != null) {
					hColumnDescriptor
							.setCompressionType(hBaseTableFamilyDescribesEntity
									.getCompression());
				}
				if (hBaseTableFamilyDescribesEntity.getCompactionCompression() != null) {
					hColumnDescriptor
							.setCompactionCompressionType(hBaseTableFamilyDescribesEntity
									.getCompactionCompression());
				}
				if (hBaseTableFamilyDescribesEntity.getMaxVersions() > 0) {
					hColumnDescriptor
							.setMaxVersions(hBaseTableFamilyDescribesEntity
									.getMaxVersions());
				}
				if (hBaseTableFamilyDescribesEntity.isInMemory()) {
					hColumnDescriptor
							.setInMemory(hBaseTableFamilyDescribesEntity
									.isInMemory());
				}
				if (hBaseTableFamilyDescribesEntity.getBlockSize() > 0) {
					hColumnDescriptor
							.setBlocksize(hBaseTableFamilyDescribesEntity
									.getBlockSize());
				}
				if (hBaseTableFamilyDescribesEntity.isBlockCacheEnabled()) {
					hColumnDescriptor
							.setBlockCacheEnabled(hBaseTableFamilyDescribesEntity
									.isBlockCacheEnabled());
				}
			}
			hTableDescriptor.addFamily(hColumnDescriptor);
		}
		return hTableDescriptor;
	}

	/**
	 * 删除表
	 * 
	 * @param tableName
	 *            表名
	 * @return boolean
	 * @throws HBaseDaoException
	 */
	public static boolean drop(String tableName, String hBaseInstanceName)
			throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		CheckUtil.checkEmptyString(hBaseInstanceName);
		logger.info(String.format("开始删除HBase表[%s].", tableName));
		HBaseAdmin hBaseAdmin = null;
		try {
			hBaseAdmin = new HBaseAdmin(
					HTablePoolEngine.getHBaseConfiguration(hBaseInstanceName));
			if (!hBaseAdmin.tableExists(tableName)) {
				logger.warn(String.format("HBase表[%s]不存在不能进行删除操作.", tableName));
				return false;
			}
			if (!hBaseAdmin.isTableDisabled(tableName)) {
				logger.info(String.format("停用HBase表[%s].", tableName));
				hBaseAdmin.disableTable(Bytes.toBytes(tableName));
			}
			hBaseAdmin.deleteTable(Bytes.toBytes(tableName));
			logger.info(String.format("删除HBase表[%s].", tableName));
			logger.info(String.format("结束删除HBase表[%s].", tableName));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			closeHBaseAdmin(hBaseAdmin);
		}
		return true;
	}

	/**
	 * 获取表信息
	 * 
	 * @param tableName
	 *            表名
	 * @return HBaseTableDescribesEntity
	 * @throws HBaseDaoException
	 */
	public static HBaseTableDescribesEntity describe(String tableName,
			String hBaseInstanceName) throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		CheckUtil.checkEmptyString(hBaseInstanceName);
		logger.info(String.format("开始获取HBase表[%s]信息.", tableName));
		HBaseTableDescribesEntity hBaseTableDescribesEntity = null;
		HBaseAdmin hBaseAdmin = null;
		try {
			hBaseAdmin = new HBaseAdmin(
					HTablePoolEngine.getHBaseConfiguration(hBaseInstanceName));
			if (!hBaseAdmin.tableExists(tableName)) {
				logger.warn(String.format("HBase表[%s]不存在不能获取信息.", tableName));
				return null;
			}
			hBaseTableDescribesEntity = new HBaseTableDescribesEntity();
			Map<String, HBaseTableFamilyDescribesEntity> familyMap = Maps
					.newHashMap();
			HTableDescriptor hTableDescriptor = hBaseAdmin
					.getTableDescriptor(Bytes.toBytes(tableName));
			Collection<HColumnDescriptor> hColumnDescriptorCollection = hTableDescriptor
					.getFamilies();
			long maxFileSize = hTableDescriptor.getMaxFileSize();
			long memstoreFlushSize = hTableDescriptor.getMemStoreFlushSize();
			hBaseTableDescribesEntity.setMaxFileSize(maxFileSize);
			hBaseTableDescribesEntity.setMemstoreFlushSize(memstoreFlushSize);
			hBaseTableDescribesEntity
					.setReadOnly(hTableDescriptor.isReadOnly());
			hBaseTableDescribesEntity.setDeferredLogFlush(hTableDescriptor
					.isDeferredLogFlush());
			for (HColumnDescriptor hColumnDescriptor : hColumnDescriptorCollection) {
				HBaseTableFamilyDescribesEntity hBaseTableFamilyDescribesEntity = new HBaseTableFamilyDescribesEntity();
				hBaseTableFamilyDescribesEntity.setTimeToLive(hColumnDescriptor
						.getTimeToLive());
				hBaseTableFamilyDescribesEntity.setBloomType(hColumnDescriptor
						.getBloomFilterType());
				hBaseTableFamilyDescribesEntity
						.setCompression(hColumnDescriptor.getCompression());
				hBaseTableFamilyDescribesEntity
						.setCompactionCompression(hColumnDescriptor
								.getCompactionCompression());
				hBaseTableFamilyDescribesEntity
						.setMaxVersions(hColumnDescriptor.getMaxVersions());
				hBaseTableFamilyDescribesEntity.setBlockSize(hColumnDescriptor
						.getBlocksize());
				hBaseTableFamilyDescribesEntity
						.setBlockCacheEnabled(hColumnDescriptor
								.isBlockCacheEnabled());
				familyMap.put(hColumnDescriptor.getNameAsString(),
						hBaseTableFamilyDescribesEntity);
			}
			hBaseTableDescribesEntity.setFamilyMap(familyMap);
			hBaseTableDescribesEntity.setTableName(tableName);
			logger.info(String.format("结束获取HBase表[%s]信息.", tableName));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			closeHBaseAdmin(hBaseAdmin);
		}
		return hBaseTableDescribesEntity;
	}

	/**
	 * 启用表
	 * 
	 * @param tableName
	 *            表名
	 * @return boolean
	 * @throws HBaseDaoException
	 */
	public static boolean enable(String tableName, String hBaseInstanceName)
			throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		CheckUtil.checkEmptyString(hBaseInstanceName);
		logger.info(String.format("开始启用HBase表[%s].", tableName));
		HBaseAdmin hBaseAdmin = null;
		try {
			hBaseAdmin = new HBaseAdmin(
					HTablePoolEngine.getHBaseConfiguration(hBaseInstanceName));
			if (!hBaseAdmin.tableExists(tableName)) {
				logger.warn(String.format("HBase表[%s]不存在不能启用.", tableName));
				return false;
			}
			if (!hBaseAdmin.isTableEnabled(tableName)) {
				hBaseAdmin.enableTable(tableName);
			}
			logger.info(String.format("启用HBase表[%s].", tableName));
			logger.info(String.format("结束启用HBase表[%s].", tableName));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			closeHBaseAdmin(hBaseAdmin);
		}
		return true;
	}

	/**
	 * 停用表
	 * 
	 * @param tableName
	 *            表名
	 * @return boolean
	 * @throws HBaseDaoException
	 */
	public static boolean disable(String tableName, String hBaseInstanceName)
			throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		CheckUtil.checkEmptyString(hBaseInstanceName);
		logger.info(String.format("开始停用HBase表[%s].", tableName));
		HBaseAdmin hBaseAdmin = null;
		try {
			hBaseAdmin = new HBaseAdmin(
					HTablePoolEngine.getHBaseConfiguration(hBaseInstanceName));
			if (!hBaseAdmin.tableExists(tableName)) {
				logger.warn(String.format("HBase表[%s]不存在不能停用.", tableName));
				return false;
			}
			if (!hBaseAdmin.isTableDisabled(tableName)) {
				hBaseAdmin.disableTable(tableName);
			}
			logger.info(String.format("停用HBase表[%s].", tableName));
			logger.info(String.format("结束停用HBase表[%s].", tableName));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			closeHBaseAdmin(hBaseAdmin);
		}
		return true;
	}

	/**
	 * 获取HBase的所有表名
	 * 
	 * @return List<String>
	 * @throws HBaseDaoException
	 */
	public static List<String> list(String hBaseInstanceName)
			throws HBaseDaoException {
		CheckUtil.checkEmptyString(hBaseInstanceName);
		logger.info("开始获取HBase的所有表名.");
		List<String> tableNameList = Lists.newArrayList();
		HBaseAdmin hBaseAdmin = null;
		try {
			hBaseAdmin = new HBaseAdmin(
					HTablePoolEngine.getHBaseConfiguration(hBaseInstanceName));
			HTableDescriptor[] hTableDescriptors = hBaseAdmin.listTables();
			for (HTableDescriptor hTableDescriptor : hTableDescriptors) {
				tableNameList.add(hTableDescriptor.getNameAsString());
			}
			logger.info("结束获取HBase的所有表名.");
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			closeHBaseAdmin(hBaseAdmin);
		}
		return tableNameList;
	}

	/**
	 * 复制HBase表结构(不能预建region分区)
	 * 
	 * @param oldTableName
	 *            旧表名
	 * @param newTableName
	 *            新表名
	 * @return boolean
	 * @throws HBaseDaoException
	 */
	public static boolean copy(String oldTableName, String newTableName,
			String hBaseInstanceName) throws HBaseDaoException {
		CheckUtil.checkEmptyString(oldTableName);
		CheckUtil.checkEmptyString(newTableName);
		CheckUtil.checkEmptyString(hBaseInstanceName);
		logger.info(String.format("开始复制HBase表结构从[%s]到[%s].", oldTableName,
				newTableName));
		HBaseAdmin hBaseAdmin = null;
		try {
			hBaseAdmin = new HBaseAdmin(
					HTablePoolEngine.getHBaseConfiguration(hBaseInstanceName));
			if (!hBaseAdmin.tableExists(oldTableName)) {
				logger.warn(String
						.format("HBase表[%s]不存在不能复制表结构.", oldTableName));
				return false;
			}
			if (hBaseAdmin.tableExists(newTableName)) {
				logger.warn(String
						.format("HBase表[%s]已经存在请选择新表名.", newTableName));
				return false;
			}
			HTableDescriptor hTableDescriptor = hBaseAdmin
					.getTableDescriptor(Bytes.toBytes(oldTableName));
			hTableDescriptor.setName(Bytes.toBytes(newTableName));
			hBaseAdmin.createTable(hTableDescriptor);
			logger.info(String.format("复制HBase表结构从[%s]到[%s].", oldTableName,
					newTableName));
			logger.info(String.format("结束复制HBase表结构从[%s]到[%s].", oldTableName,
					newTableName));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			closeHBaseAdmin(hBaseAdmin);
		}
		return true;
	}

	/**
	 * 删除列族
	 * 
	 * @param tableName
	 *            表名
	 * @param family
	 *            列族名
	 * @return boolean
	 * @throws HBaseDaoException
	 */
	public static boolean removeFamily(String tableName, String family,
			String hBaseInstanceName) throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		CheckUtil.checkEmptyString(family);
		CheckUtil.checkEmptyString(hBaseInstanceName);
		logger.info(String.format("开始删除HBase表[%s]的列族[%s].", tableName, family));
		HBaseAdmin hBaseAdmin = null;
		try {
			hBaseAdmin = new HBaseAdmin(
					HTablePoolEngine.getHBaseConfiguration(hBaseInstanceName));
			if (!hBaseAdmin.tableExists(tableName)) {
				logger.warn(String.format("HBase表[%s]不存在不能删除列族.", tableName));
				return false;
			}
			disable(tableName, hBaseInstanceName);
			hBaseAdmin.deleteColumn(tableName, family);
			logger.info(String
					.format("删除HBase表[%s]的列族[%s].", tableName, family));
			enable(tableName, hBaseInstanceName);
			logger.info(String.format("结束删除HBase表[%s]的列族[%s].", tableName,
					family));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			closeHBaseAdmin(hBaseAdmin);
		}
		return true;
	}

	/**
	 * 修改表
	 * 
	 * @param hBaseTableDescribesEntity
	 *            HBase表信息描述
	 * @return boolean
	 * @throws HBaseDaoException
	 */
	public static boolean modify(
			HBaseTableDescribesEntity hBaseTableDescribesEntity,
			String hBaseInstanceName) throws HBaseDaoException {
		CheckUtil.checkEmptyString(hBaseInstanceName);
		CheckUtil.checkEmptyString(hBaseTableDescribesEntity.getTableName());
		CheckUtil.checkEmptyMap(hBaseTableDescribesEntity.getFamilyMap());
		logger.info(String.format("开始修改HBase表[%s]信息.",
				hBaseTableDescribesEntity.getTableName()));
		HBaseAdmin hBaseAdmin = null;
		try {
			hBaseAdmin = new HBaseAdmin(
					HTablePoolEngine.getHBaseConfiguration(hBaseInstanceName));
			if (!hBaseAdmin.tableExists(hBaseTableDescribesEntity
					.getTableName())) {
				return false;
			}
			byte[] tableName = Bytes.toBytes(hBaseTableDescribesEntity
					.getTableName());
			disable(hBaseTableDescribesEntity.getTableName(), hBaseInstanceName);
			HTableDescriptor hTableDescriptor = hBaseAdmin
					.getTableDescriptor(tableName);
			if (hBaseTableDescribesEntity.getMaxFileSize() > 0) {
				hTableDescriptor.setMaxFileSize(hBaseTableDescribesEntity
						.getMaxFileSize());
			}
			if (hBaseTableDescribesEntity.getMemstoreFlushSize() > 0) {
				hTableDescriptor.setMemStoreFlushSize(hBaseTableDescribesEntity
						.getMemstoreFlushSize());
			}
			if (hBaseTableDescribesEntity.isReadOnly()) {
				hTableDescriptor.setReadOnly(hBaseTableDescribesEntity
						.isReadOnly());
			}
			if (hBaseTableDescribesEntity.isDeferredLogFlush()) {
				hTableDescriptor.setDeferredLogFlush(hBaseTableDescribesEntity
						.isDeferredLogFlush());
			}
			for (String family : hBaseTableDescribesEntity.getFamilyMap()
					.keySet()) {
				HColumnDescriptor hColumnDescriptor = null;
				hColumnDescriptor = hTableDescriptor.getFamily(Bytes
						.toBytes(family));
				if (CheckUtil.isNull(hColumnDescriptor)) {
					hColumnDescriptor = new HColumnDescriptor(family);
					logger.info(String.format("列族[%s]不存在则添加列族.", family));
				}
				HBaseTableFamilyDescribesEntity hBaseTableFamilyDescribesEntity = hBaseTableDescribesEntity
						.getFamilyMap().get(family);
				if (hBaseTableFamilyDescribesEntity.getTimeToLive() > 0) {
					hColumnDescriptor
							.setTimeToLive(hBaseTableFamilyDescribesEntity
									.getTimeToLive());
				}
				if (hBaseTableFamilyDescribesEntity.getBloomType() != null) {
					hColumnDescriptor
							.setBloomFilterType(hBaseTableFamilyDescribesEntity
									.getBloomType());
				}
				if (hBaseTableFamilyDescribesEntity.getCompression() != null) {
					hColumnDescriptor
							.setCompressionType(hBaseTableFamilyDescribesEntity
									.getCompression());
				}
				if (hBaseTableFamilyDescribesEntity.getCompactionCompression() != null) {
					hColumnDescriptor
							.setCompactionCompressionType(hBaseTableFamilyDescribesEntity
									.getCompactionCompression());
				}
				if (hBaseTableFamilyDescribesEntity.getMaxVersions() > 0) {
					hColumnDescriptor
							.setMaxVersions(hBaseTableFamilyDescribesEntity
									.getMaxVersions());
				}
				if (hBaseTableFamilyDescribesEntity.isInMemory()) {
					hColumnDescriptor
							.setInMemory(hBaseTableFamilyDescribesEntity
									.isInMemory());
				}
				if (hBaseTableFamilyDescribesEntity.getBlockSize() > 0) {
					hColumnDescriptor
							.setBlocksize(hBaseTableFamilyDescribesEntity
									.getBlockSize());
				}
				if (hBaseTableFamilyDescribesEntity.isBlockCacheEnabled()) {
					hColumnDescriptor
							.setBlockCacheEnabled(hBaseTableFamilyDescribesEntity
									.isBlockCacheEnabled());
				}
				hTableDescriptor.addFamily(hColumnDescriptor);
			}
			hBaseAdmin.modifyTable(tableName, hTableDescriptor);
			enable(hBaseTableDescribesEntity.getTableName(), hBaseInstanceName);
			logger.info(String.format("修改HBase表[%s]信息.",
					hBaseTableDescribesEntity));
			logger.info(String.format("结束修改HBase表[%s]信息.",
					hBaseTableDescribesEntity.getTableName()));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			closeHBaseAdmin(hBaseAdmin);
		}
		return true;
	}

	/**
	 * 显示的将memStore中的数据刷写到磁盘
	 * 
	 * @param tableNameOrRegionName
	 *            表名或region名
	 * @throws HBaseDaoException
	 */
	public static void flush(String tableNameOrRegionName,
			String hBaseInstanceName) throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableNameOrRegionName);
		CheckUtil.checkEmptyString(hBaseInstanceName);
		logger.info(String.format("开始将[%s]的memStore中的数据刷写到磁盘.",
				tableNameOrRegionName));
		HBaseAdmin hBaseAdmin = null;
		try {
			hBaseAdmin = new HBaseAdmin(
					HTablePoolEngine.getHBaseConfiguration(hBaseInstanceName));
			hBaseAdmin.flush(tableNameOrRegionName);
			logger.info(String.format("结束将[%s]的memStore中的数据刷写到磁盘.",
					tableNameOrRegionName));
		} catch (InterruptedException | IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			closeHBaseAdmin(hBaseAdmin);
		}
	}

	/**
	 * 合并
	 * 
	 * @param tableNameOrRegionName
	 *            表名或region名
	 * @param family
	 *            列族名
	 * @throws HBaseDaoException
	 */
	public static void compact(String tableNameOrRegionName, String family,
			String hBaseInstanceName) throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableNameOrRegionName);
		CheckUtil.checkEmptyString(hBaseInstanceName);
		logger.info("开始合并.");
		HBaseAdmin hBaseAdmin = null;
		try {
			hBaseAdmin = new HBaseAdmin(
					HTablePoolEngine.getHBaseConfiguration(hBaseInstanceName));
			if (StringUtils.isEmpty(family)) {
				logger.info(String.format("合并[%s].", tableNameOrRegionName));
				hBaseAdmin.compact(tableNameOrRegionName);
			} else {
				logger.info(String.format("合并[%s][%s].", tableNameOrRegionName,
						family));
				hBaseAdmin.compact(tableNameOrRegionName, family);
			}
			logger.info("结束合并.");
		} catch (IOException | InterruptedException e) {
			throw new HBaseDaoException(e);
		} finally {
			closeHBaseAdmin(hBaseAdmin);
		}
	}

	/**
	 * major合并
	 * 
	 * @param tableNameOrRegionName
	 *            表名或region名
	 * @param family
	 *            列族名
	 * @throws HBaseDaoException
	 */
	public static void majorCompact(String tableNameOrRegionName,
			String family, String hBaseInstanceName) throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableNameOrRegionName);
		CheckUtil.checkEmptyString(hBaseInstanceName);
		logger.info("开始major合并.");
		HBaseAdmin hBaseAdmin = null;
		try {
			hBaseAdmin = new HBaseAdmin(
					HTablePoolEngine.getHBaseConfiguration(hBaseInstanceName));
			if (StringUtils.isEmpty(family)) {
				logger.info(String
						.format("major合并[%s].", tableNameOrRegionName));
				hBaseAdmin.majorCompact(tableNameOrRegionName);
			} else {
				logger.info(String.format("major合并[%s][%s].",
						tableNameOrRegionName, family));
				hBaseAdmin.majorCompact(tableNameOrRegionName, family);
			}
			logger.info("结束major合并.");
		} catch (IOException | InterruptedException e) {
			throw new HBaseDaoException(e);
		} finally {
			closeHBaseAdmin(hBaseAdmin);
		}
	}

	/**
	 * 拆分一个region或整张表
	 * 
	 * @param tableNameOrRegionName
	 *            表名或region名
	 * @param rowKey
	 *            主键(包含此主键的region会按照这个主键来拆分)
	 * @throws HBaseDaoException
	 */
	public static void split(String tableNameOrRegionName, String rowKey,
			String hBaseInstanceName) throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableNameOrRegionName);
		CheckUtil.checkEmptyString(hBaseInstanceName);
		logger.info("开始拆分region.");
		HBaseAdmin hBaseAdmin = null;
		try {
			hBaseAdmin = new HBaseAdmin(
					HTablePoolEngine.getHBaseConfiguration(hBaseInstanceName));
			if (StringUtils.isEmpty(rowKey)) {
				logger.info(String.format("拆分region[%s].",
						tableNameOrRegionName));
				hBaseAdmin.split(tableNameOrRegionName);
			} else {
				logger.info(String.format("拆分region[%s][%s].",
						tableNameOrRegionName, rowKey));
				hBaseAdmin.split(tableNameOrRegionName, rowKey);
			}
			logger.info("结束拆分region.");
		} catch (IOException | InterruptedException e) {
			throw new HBaseDaoException(e);
		} finally {
			closeHBaseAdmin(hBaseAdmin);
		}
	}

	/**
	 * 负载均衡
	 * 
	 * @return boolean
	 * @throws HBaseDaoException
	 */
	public static boolean balancer(String hBaseInstanceName)
			throws HBaseDaoException {
		CheckUtil.checkEmptyString(hBaseInstanceName);
		logger.info("开始负载均衡.");
		boolean flag = false;
		HBaseAdmin hBaseAdmin = null;
		try {
			hBaseAdmin = new HBaseAdmin(
					HTablePoolEngine.getHBaseConfiguration(hBaseInstanceName));
			flag = hBaseAdmin.balancer();
			logger.info("结束负载均衡.");
		} catch (MasterNotRunningException | ZooKeeperConnectionException e) {
			throw new HBaseDaoException(e);
		} finally {
			closeHBaseAdmin(hBaseAdmin);
		}
		return flag;
	}

	/**
	 * 获取HBase集群状态信息
	 * 
	 * @return HBaseClustersStatusEntity
	 * @throws HBaseDaoException
	 */
	public static HBaseClustersStatusEntity getHBaseClustersStatus(
			String hBaseInstanceName) throws HBaseDaoException {
		CheckUtil.checkEmptyString(hBaseInstanceName);
		logger.info("开始获取HBase集群状态信息.");
		HBaseAdmin hBaseAdmin = null;
		HBaseClustersStatusEntity hBaseClustersStatusEntity = null;
		try {
			hBaseAdmin = new HBaseAdmin(
					HTablePoolEngine.getHBaseConfiguration(hBaseInstanceName));
			hBaseClustersStatusEntity = new HBaseClustersStatusEntity();
			ClusterStatus clusterStatus = hBaseAdmin.getClusterStatus();
			hBaseClustersStatusEntity.setServersSize(clusterStatus
					.getServersSize());
			hBaseClustersStatusEntity.setServersCollection(clusterStatus
					.getServers());
			hBaseClustersStatusEntity.setDeadServers(clusterStatus
					.getDeadServers());
			hBaseClustersStatusEntity.setDeadServersCollection(clusterStatus
					.getDeadServerNames());
			hBaseClustersStatusEntity.setAverageLoad(clusterStatus
					.getAverageLoad());
			hBaseClustersStatusEntity.setRegionsCount(clusterStatus
					.getRegionsCount());
			hBaseClustersStatusEntity.setRequestsCount(clusterStatus
					.getRequestsCount());
			hBaseClustersStatusEntity.sethBaseVersion(clusterStatus
					.getHBaseVersion());
			hBaseClustersStatusEntity
					.setClusterId(clusterStatus.getClusterId());
			hBaseClustersStatusEntity.setRegionInTransitionMap(clusterStatus
					.getRegionsInTransition());
			Map<String, HBaseHServerLoadEntity> hBaseHServerLoadEntityMap = Maps
					.newHashMap();
			for (ServerName serverName : hBaseClustersStatusEntity
					.getServersCollection()) {
				HBaseHServerLoadEntity hBaseHServerLoadEntity = new HBaseHServerLoadEntity();
				HServerLoad hServerLoad = clusterStatus.getLoad(serverName);
				hBaseHServerLoadEntity.setHostName(serverName.getHostname());
				hBaseHServerLoadEntity.setHostAndPort(serverName
						.getHostAndPort());
				hBaseHServerLoadEntity.setStartTime(serverName.getStartcode());
				hBaseHServerLoadEntity
						.setServerName(serverName.getServerName());
				hBaseHServerLoadEntity.setPort(serverName.getPort());
				hBaseHServerLoadEntity.setNumberOfRegions(hServerLoad
						.getNumberOfRegions());
				hBaseHServerLoadEntity.setNumberOfRequests(hServerLoad
						.getNumberOfRequests());
				hBaseHServerLoadEntity.setUsedHeapMB(hServerLoad
						.getUsedHeapMB());
				hBaseHServerLoadEntity.setMaxHeapMB(hServerLoad.getMaxHeapMB());
				hBaseHServerLoadEntity.setStoreFiles(hServerLoad
						.getStorefiles());
				hBaseHServerLoadEntity.setStoreFileSizeInMB(hServerLoad
						.getStorefileSizeInMB());
				hBaseHServerLoadEntity.setStoreFileIndexSizeInMB(hServerLoad
						.getStorefileIndexSizeInMB());
				Map<String, HBaseRegionLoadEntity> hBaseRegionLoadEntityMap = Maps
						.newHashMap();
				for (byte[] regionName : hServerLoad.getRegionsLoad().keySet()) {
					HBaseRegionLoadEntity hBaseRegionLoadEntity = new HBaseRegionLoadEntity();
					RegionLoad regionLoad = hServerLoad.getRegionsLoad().get(
							regionName);
					hBaseRegionLoadEntity.setName(regionLoad.getNameAsString());
					hBaseRegionLoadEntity.setStores(regionLoad.getStores());
					hBaseRegionLoadEntity.setStoreFiles(regionLoad
							.getStorefiles());
					hBaseRegionLoadEntity.setStoreFileSizeMB(regionLoad
							.getStorefileSizeMB());
					hBaseRegionLoadEntity.setStoreFileIndexSizeMB(regionLoad
							.getStorefileIndexSizeMB());
					hBaseRegionLoadEntity.setMemStoreSizeMB(regionLoad
							.getMemStoreSizeMB());
					hBaseRegionLoadEntity.setRequestsCount(regionLoad
							.getRequestsCount());
					hBaseRegionLoadEntity.setReadRequestsCount(regionLoad
							.getReadRequestsCount());
					hBaseRegionLoadEntity.setWriteRequestsCount(regionLoad
							.getWriteRequestsCount());
					hBaseRegionLoadEntityMap.put(
							hBaseRegionLoadEntity.getName(),
							hBaseRegionLoadEntity);
				}
				hBaseHServerLoadEntity
						.sethBaseRegionLoadEntityMap(hBaseRegionLoadEntityMap);
				hBaseHServerLoadEntityMap.put(
						hBaseHServerLoadEntity.getServerName(),
						hBaseHServerLoadEntity);
			}
			hBaseClustersStatusEntity
					.sethBaseHServerLoadEntityMap(hBaseHServerLoadEntityMap);
			logger.info("结束获取HBase集群状态信息.");
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			closeHBaseAdmin(hBaseAdmin);
		}
		return hBaseClustersStatusEntity;
	}

	/**
	 * 清除表数据
	 * 
	 * @param tableName
	 *            表名
	 * @return boolean
	 * @throws HBaseDaoException
	 */
	public static boolean truncate(String tableName, String hBaseInstanceName)
			throws HBaseDaoException {
		CheckUtil.checkEmptyString(tableName);
		logger.info(String.format("开始清空HBase表[%s]数据.", tableName));
		HBaseAdmin hBaseAdmin = null;
		HTableInterface hTableInterface = null;
		try {
			hBaseAdmin = new HBaseAdmin(
					HTablePoolEngine.getHBaseConfiguration(hBaseInstanceName));
			if (!hBaseAdmin.tableExists(tableName)) {
				logger.warn(String.format("HBase表[%s]不存在不能清空.", tableName));
				return false;
			}
			hTableInterface = HTablePoolEngine.getHTable(tableName,
					hBaseInstanceName);
			HTableDescriptor hTableDescriptor = hTableInterface
					.getTableDescriptor();
			drop(tableName, hBaseInstanceName);
			hBaseAdmin.createTable(hTableDescriptor);
			logger.info(String.format("清空HBase表[%s]数据.", tableName));
			logger.info(String.format("结束清空HBase表[%s]数据.", tableName));
		} catch (IOException e) {
			throw new HBaseDaoException(e);
		} finally {
			HTablePoolEngine.closeHTable(hTableInterface);
			closeHBaseAdmin(hBaseAdmin);
		}
		return true;
	}

	public static void split(String tableNameOrRegionName,
			String hBaseInstanceName) throws HBaseDaoException {
		split(tableNameOrRegionName, null, hBaseInstanceName);
	}

	public static void compact(String tableNameOrRegionName,
			String hBaseInstanceName) throws HBaseDaoException {
		compact(tableNameOrRegionName, null, hBaseInstanceName);
	}

	public static void majorCompact(String tableNameOrRegionName,
			String hBaseInstanceName) throws HBaseDaoException {
		majorCompact(tableNameOrRegionName, null, hBaseInstanceName);
	}

	private static void closeHBaseAdmin(HBaseAdmin hBaseAdmin)
			throws HBaseDaoException {
		if (CheckUtil.isNotNull(hBaseAdmin)) {
			try {
				hBaseAdmin.close();
			} catch (IOException e) {
				throw new HBaseDaoException(e);
			}
		}
	}

	private HBaseHelper() {
	}

}